{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "colab_type": "text",
        "id": "view-in-github"
      },
      "source": [
        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "QgmD1wbmT4mj"
      },
      "outputs": [],
      "source": [
        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "RuUHlGZjVt6W"
      },
      "source": [
        "# Learn Beam PTransforms\n",
        "\n",
        "After this notebook, you should be able to:\n",
        "1. Use user-defined functions in your `PTransforms`\n",
        "2. Learn Beam SDK composite transforms\n",
        "3. Create you own composite transforms to simplify your `Pipeline`\n",
        "\n",
        "For basic Beam `PTransforms`, please check out [this Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
        "\n",
        "Beam Python SDK also provides [a list of built-in transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "Ldx0Z7nWGopE"
      },
      "source": [
        "## How To Approach This Tutorial\n",
        "\n",
        "This tutorial is designed for someone who likes to learn by doing. There will be code cells where you can write your own code to test your understanding.\n",
        "\n",
        "As such, to get the most out of this tutorial, we strongly recommend typing code by hand as you’re working through the tutorial and not using copy/paste. This will help you develop muscle memory and a stronger understanding."
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Prerequisites\n",
        "\n",
        "We'll assume you have familiarity with Python or Pandas. It is highly recommended to finish [this beginner tutorial](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb) first. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "jy1zaj4NDE0T"
      },
      "source": [
        "To begin, run the cell below to install and import Apache Beam."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "pNure-fW8hl3"
      },
      "outputs": [],
      "source": [
        "# Run a shell command and import beam\n",
        "!pip install --quiet apache-beam\n",
        "import apache_beam as beam\n",
        "beam.__version__"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "vyksB2VMtv3m"
      },
      "outputs": [],
      "source": [
        "# Set the logging level to reduce verbose information\n",
        "import logging\n",
        "\n",
        "logging.root.setLevel(logging.ERROR)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "M1ku4nX_Gutb"
      },
      "source": [
        "\n",
        "\n",
        "---\n",
        "\n",
        "\n",
        "\n",
        "---\n",
        "\n"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "0qDeT34SS1_8"
      },
      "source": [
        "## 1. Simple User-Defined Function (UDF)\n",
        "\n",
        "Some `PTransforms` allow you to run your own functions and user-defined code to specify how your transform is applied. \n",
        "For example, the below [`CombineGlobally`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/) transform \n",
        "defines a custom `bounded_sum` function to aggregate the elements,"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "UZTWBGZ0TQWF"
      },
      "outputs": [],
      "source": [
        "pc = [1, 10, 100, 1000]\n",
        "\n",
        "# User-defined function\n",
        "def bounded_sum(values, bound=500):\n",
        "  return min(sum(values), bound)\n",
        "\n",
        "small_sum = pc | beam.CombineGlobally(bounded_sum)  # [500]\n",
        "large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000)  # [1111]\n",
        "\n",
        "print(small_sum, large_sum)"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "UBFRcPO06xiV"
      },
      "source": [
        "## 2. Transforms: ParDo and Combine\n",
        "\n",
        "A `ParDo` transform considers each element in the input `PCollection`, performs your user code to process each element, and emits zero, one, or multiple elements to an output `PCollection`. `Combine` is another Beam transform for combining collections of elements or values in your data.\n",
        "Both allow flexible UDFs to define how you process the data."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "P4W-1HIiV-HP"
      },
      "source": [
        "### 2.1 DoFn\n",
        "\n",
        "DoFn - a Beam Python class that defines a distributed processing function (used in [ParDo](https://beam.apache.org/documentation/programming-guide/#pardo))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "TjOzWnQd-dan"
      },
      "outputs": [],
      "source": [
        "data = [1, 2, 3, 4]\n",
        "\n",
        "# create a DoFn to multiply each element by five\n",
        "# you can define the processing code under `process`\n",
        "# which is required for a DoFn\n",
        "class MultiplyByFive(beam.DoFn):\n",
        "  def process(self, element):\n",
        "    yield element*5\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create values' >> beam.Create(data)\n",
        "      | 'Multiply by 5' >> beam.ParDo(MultiplyByFive())\n",
        "  )\n",
        "\n",
        "  outputs | beam.Map(print)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1qL2crwNXQXe"
      },
      "source": [
        "### 2.2 CombineFn\n",
        "\n",
        "CombineFn - define associative and commutative aggregations (used in [Combine](https://beam.apache.org/documentation/programming-guide/#combine))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "zxLatbOa9FyA"
      },
      "outputs": [],
      "source": [
        "data = [1, 2, 3, 4]\n",
        "\n",
        "# create a CombineFn to get the product of each element\n",
        "# you need to provide four operations\n",
        "class ProductFn(beam.CombineFn):\n",
        "  def create_accumulator(self):\n",
        "    # creates a new accumulator to store the initial value\n",
        "    return 1\n",
        "\n",
        "  def add_input(self, current_prod, input):\n",
        "    # adds an input element to an accumulator\n",
        "    return current_prod*input\n",
        "\n",
        "  def merge_accumulators(self, accumulators):\n",
        "    # merge several accumulators into a single accumulator\n",
        "    prod = 1\n",
        "    for accu in accumulators:\n",
        "      prod *= accu\n",
        "    return prod\n",
        "\n",
        "  def extract_output(self, prod):\n",
        "    # performs the final computation\n",
        "    return prod\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create values' >> beam.Create(data)\n",
        "      | 'Multiply by 2' >> beam.CombineGlobally(ProductFn())\n",
        "  )\n",
        "  outputs | beam.LogElements()\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "r1Vw1d5vJoIE"
      },
      "source": [
        "Note: The above `DoFn` and `CombineFn` examples are for demonstration purposes. You could easily achieve the same functionality by using the simple function illustrated in section 1."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "bTer_URwS0wb"
      },
      "source": [
        "\n",
        "\n",
        "---\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qPnSfU5wLTN5"
      },
      "source": [
        "## 3. Composite Transforms\n",
        "\n",
        "Now that you've learned the basic `PTransforms`, Beam allows you to simplify the process of processing and transforming your data through [Composite Transforms](https://beam.apache.org/documentation/programming-guide/#composite-transforms).\n",
        "\n",
        "Composite transforms can nest multiple transforms into a single composite transform, making your code easier to understand."
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "4tBsLkeatNUU"
      },
      "source": [
        "To see an example of this, let's take a look at how we can improve the `Pipeline` we built to count each word in Shakespeare's *King Lear*."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Vokbrhhyto7H"
      },
      "outputs": [],
      "source": [
        "!mkdir -p data\n",
        "!gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "R-4uyn0Tttr2"
      },
      "outputs": [],
      "source": [
        "import re\n",
        "\n",
        "# Function used to run and display the result\n",
        "def run(cmd):\n",
        "  print('>> {}'.format(cmd))\n",
        "  !{cmd}\n",
        "  print('')\n",
        "\n",
        "inputs_pattern = 'data/*'\n",
        "outputs_prefix = 'outputs/part'\n",
        "\n",
        "# Running locally in the DirectRunner.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  word_count = (\n",
        "      pipeline\n",
        "        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
        "        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line))\n",
        "        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
        "        | 'Group and sum' >> beam.CombinePerKey(sum)\n",
        "        | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
        "  )\n",
        "\n",
        "# Sample the first 20 results, remember there are no ordering guarantees.\n",
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "wl8wLwZZtbnX"
      },
      "source": [
        "Although the code above is a viable way to design your `Pipeline`, you can see that we use multiple transforms to perform one process:\n",
        "1. `FlatMap` is used to find words in each line\n",
        "2. `Map` is used to create key-value pairs with each word where the value is 1\n",
        "3. `CombinePerKey` is used so that we can then group by each word and count up the sums\n",
        "\n",
        "All of these `PTransforms`, in combination, are meant to count each word in *King Lear*. You can simplify the process and combine these three transforms into one by using composite transforms.\n",
        "\n",
        "There's two ways you can follow:\n",
        "1. Using Beam SDK's built-in composite transforms\n",
        "2. Creating your own composite transforms"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "nPG11AEkNMKK"
      },
      "source": [
        "### 3.1 Beam SDK Composite Transforms\n",
        "Beam allows combining a sequence of transforms into a composite transform. \n",
        "Many of the Beam's handy pre-written transforms are composite transforms under the hood. \n",
        "In this tutorial, we will cover one example of how to create a composite transform. \n",
        "However, to see other composite transforms you can use, \n",
        "check out the following API reference pages: [Beam Transforms Package](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.html), [Beam ML Package](https://beam.apache.org/releases/pydoc/current/apache_beam.ml)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "CG3lYxgqsXNk"
      },
      "source": [
        "By using a Beam SDK composite transform, you're able to easily combine multiple transforms into one line.\n",
        "\n",
        "For this tutorial, we will use the SDK-provided [`Count` transform](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.combiners.html#apache_beam.transforms.combiners.Count), which counts each element in the `PCollection`.\n",
        "\n",
        "\n",
        "```\n",
        "beam.combiners.Count.PerElement()\n",
        "```\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "JQvwxTOHjOnr"
      },
      "source": [
        "This `Count` transform performs the work that both the `Map` and `CombinePerKey` transforms from our Word Count `Pipeline` but do it in one line.\n",
        "\n",
        "Edit the Word Count `Pipeline` below to use a composite transform by implementing Beam's `Count` transform (see above). Applying a composite transform is just like applying a `PTransform` to your `PCollection`.\n",
        "\n",
        "Below the code cell you will edit is a hidden answer code cell to check your work. If you're stuck, try opening the hint first!"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "lHmq9azUmldL"
      },
      "outputs": [],
      "source": [
        "#@title Open code to show the hint\n",
        "\n",
        "#Hint: Replace the `Map` and `CombinePerKey` transforms with Beam's `Count` transform (see above)*italicized text*"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "QEE8nhc_lhPB"
      },
      "outputs": [],
      "source": [
        "#@title EDIT THIS CODE CELL TO USE beam.combiners.Count.PerElement\n",
        "# EDIT THIS CODE CELL\n",
        "\n",
        "inputs_pattern = 'data/*'\n",
        "outputs_prefix = 'outputs/userans'\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  word_count = (\n",
        "      pipeline\n",
        "        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
        "        | 'Find words' >>\n",
        "        beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line))\n",
        "        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
        "        | 'Group and sum' >> beam.CombinePerKey(sum)\n",
        "        | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
        "  )\n",
        "\n",
        "# After you're done, check to see if your code outputs\n",
        "# the same PCollection by uncommenting the code below\n",
        "'''\n",
        "# Sample the first 20 results, remember there are no ordering guarantees.\n",
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))\n",
        "'''"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TW4rLroMIMPe"
      },
      "source": [
        "Below is our answer to check your work. It is the Word Count example from above, but they now combine `Map` and `CombinePerKey` into one line using the `Count` composite transform."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "T-utf0YLIvEe"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "inputs_pattern = 'data/*'\n",
        "outputs_prefix = 'outputs/part2'\n",
        "\n",
        "# Running locally in the DirectRunner.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  word_count = (\n",
        "      pipeline\n",
        "        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
        "        | 'Find words' >>\n",
        "        beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line))\n",
        "        # Count composite transform from Beam SDK\n",
        "        | 'Count words' >> beam.combiners.Count.PerElement()\n",
        "        | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
        "  )\n",
        "\n",
        "# Sample the first 20 results, remember there are no ordering guarantees.\n",
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FlcAgn0HlP3v"
      },
      "source": [
        "> Summary: Applying a composite transform is just like applying a `PTransform` to your `PCollection`, but it simplifies the process by combining multiple `PTransforms` in one line."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IoENeJ1kMndv"
      },
      "source": [
        "### 3.2 Creating Your Own Composite Transform\n",
        "\n",
        "We simplified the original code using a Beam SDK composite transform, but we can simplify it further by creating our own composite transform function."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "e0Og38YMQ0hV"
      },
      "source": [
        "Below is an example of a composite transform you can create that the Beam SDK does not cover. The function combines the `Count` composite transform you implemented above, as well as the `FlatMap` transform that converts lines of texts into individual words.\n",
        "\n",
        "Note that because `Count` is itself a composite transform, `CountWords` is also a nested composite transform."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "cvkGb-QcQyD3"
      },
      "outputs": [],
      "source": [
        "# The CountWords Composite Transform inside the WordCount pipeline.\n",
        "@beam.ptransform_fn\n",
        "def CountWords(pcoll):\n",
        "  return (\n",
        "      pcoll\n",
        "      # Convert lines of text into individual words.\n",
        "      | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\\']+', x))\n",
        "      # Count the number of times each word occurs.\n",
        "      | beam.combiners.Count.PerElement()\n",
        "  )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "OJCZKYxGNAfU"
      },
      "source": [
        "You can then use this `CountWords` composite transform in your `Pipeline`, making your pipeline more visually easy to parse through.\n",
        "\n",
        "Try editing the Word Count `Pipeline` below to incoporate this transform into the pipeline.\n",
        "\n",
        "Below the code cell you will edit is a hidden answer code cell to check your work. If you're stuck, try opening the hint first!"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "9XO-uwhL8w27"
      },
      "outputs": [],
      "source": [
        "#@title Open code to show the hint\n",
        "\n",
        "#Hint: The newly defined transform combines the Count and FlatMap transform.\n",
        "#Replace the `FlatMap` and `Count` transforms with CountWords() (see above)*italicized text*"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "FyW5ug6S3-1B"
      },
      "outputs": [],
      "source": [
        "#@title EDIT THIS CODE CELL TO USE YOUR `CountWords`\n",
        "# EDIT THIS CODE CELL\n",
        "\n",
        "inputs_pattern = 'data/*'\n",
        "outputs_prefix = 'outputs/part3'\n",
        "\n",
        "# Running locally in the DirectRunner.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  word_count = (\n",
        "      pipeline\n",
        "        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
        "        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line))\n",
        "        | 'Count words' >> beam.combiners.Count.PerElement()\n",
        "        | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
        "  )\n",
        "\n",
        "pipeline.run()\n",
        "# Sample the first 20 results, remember there are no ordering guarantees.\n",
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "6Cn4pk3j8K4p"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "inputs_pattern = 'data/*'\n",
        "outputs_prefix = 'outputs/part3'\n",
        "\n",
        "# Running locally in the DirectRunner.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  word_count = (\n",
        "      pipeline\n",
        "        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
        "        # The composite transform function you created above\n",
        "        | 'Count Words' >> CountWords()\n",
        "        | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
        "  )\n",
        "\n",
        "# Sample the first 20 results, remember there are no ordering guarantees.\n",
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "w73aU2IsDAS1"
      },
      "source": [
        "### 3.3 Creating Your Own Composite Transform With `PTransform` Directly"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "uLEAICZKDz6e"
      },
      "source": [
        "To create your own composite transform, create a subclass of the `PTransform` class and override the `expand` method to specify the actual processing logic\n",
        "([more details](https://beam.apache.org/documentation/programming-guide/#composite-transform-creation))."
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "JT_ZtzjIEHhD"
      },
      "source": [
        "For example, if we wanted to create our own composite transform that counted the length of each word.\n",
        "\n",
        "The following code sample shows how to declare a `PTransform` that accepts a `PCollection` of Strings for input, and outputs a `PCollection` of integers\n",
        "to show the string lengths."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "2H1EKs9YDVKF"
      },
      "outputs": [],
      "source": [
        "class ComputeWordLengths(beam.PTransform):\n",
        "  def expand(self, pcoll):\n",
        "    # Transform logic goes here.\n",
        "    return pcoll | beam.Map(lambda x: (x, len(x)))"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Within the above `PTransform` subclass, you’ll need to override the `expand` method. The `expand` method is where you add the processing logic for the `PTransform`. \n",
        "Your override of `expand` must accept the appropriate type of input `PCollection` as a parameter, and specify the output `PCollection` as the return value.\n",
        "\n",
        "As long as you override the `expand` method in your `PTransform` subclass to accept the appropriate input `PCollection`(s) and \n",
        "return the corresponding output `PCollection`(s), you can include as many transforms as you want. \n",
        "These transforms can include core transforms (`ParDo`), composite transforms, or the transforms included in the Beam SDK libraries.\n",
        "\n",
        "Your composite transform’s parameters and return value must match the initial input type and final return type for the entire transform, even if the transform’s intermediate data changes type multiple times.\n",
        "\n",
        "Note: The `expand` method of a `PTransform` is not meant to be invoked directly by the user of a transform. \n",
        "Instead, you should call the apply method on the PCollection itself, with the transform as an argument. \n",
        "This allows transforms to be nested within the structure of your pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "3cfiUXTGR4Uk"
      },
      "outputs": [],
      "source": [
        "# quickly test it works\n",
        "[\"KING\", \"OF\"] | ComputeWordLengths()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "CUpZFaTwSPRH"
      },
      "outputs": [],
      "source": [
        "#@title Click to check how to use your composite transform to build the pipeline\n",
        "\n",
        "# put this into the Beam pipeline to compute the length of each word\n",
        "inputs_pattern = 'data/*'\n",
        "outputs_prefix = 'outputs/part33'\n",
        "\n",
        "# Running locally in the DirectRunner.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  word_count = (\n",
        "      pipeline\n",
        "        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
        "        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line))\n",
        "        | 'Count Word Lengths' >> ComputeWordLengths()\n",
        "        | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
        "  )\n",
        "\n",
        "# Sample the first 20 results, remember there are no ordering guarantees.\n",
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FSszoOt4Irme"
      },
      "source": [
        "## Final Reading\n",
        "\n",
        "The PTransform Style Guide contains additional information not included here, such as style guidelines, logging and testing guidance, and language-specific considerations. The guide is a useful starting point when you want to write new composite PTransforms.\n",
        "\n",
        "https://beam.apache.org/contribute/ptransform-style-guide/"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
